home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Chip 2007 January, February, March & April
/
Chip-Cover-CD-2007-02.iso
/
Pakiet bezpieczenstwa
/
mini Pentoo LiveCD 2006.1
/
mpentoo-2006.1.iso
/
livecd.squashfs
/
usr
/
lib
/
python2.4
/
site-packages
/
impacket
/
dcerpc
/
epm.py
< prev
next >
Wrap
Text File
|
2006-05-23
|
7KB
|
215 lines
# Copyright (c) 2003-2006 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# $Id: epm.py,v 1.5 2006/05/23 21:19:26 gera Exp $
#
import array
import struct
from impacket import ImpactPacket
from impacket import uuid
import dcerpc
import ndrutils
import transport
MSRPC_UUID_PORTMAP ='\x08\x83\xaf\xe1\x1f\x5d\xc9\x11\x91\xa4\x08\x00\x2b\x14\xa0\xfa\x03\x00\x00\x00'
class EPMLookupRequestHeader(ImpactPacket.Header):
OP_NUM = 2
__SIZE = 76
def __init__(self, aBuffer = None, endianness = '<'):
ImpactPacket.Header.__init__(self, EPMLookupRequestHeader.__SIZE)
self.endianness = endianness
self.set_inquiry_type(0)
self.set_referent_id(1)
self.set_referent_id2(2)
self.set_max_entries(1)
if aBuffer: self.load_header(aBuffer)
def get_inquiry_type(self):
return self.get_long(0, self.endianness)
def set_inquiry_type(self, type):
self.set_long(0, type, self.endianness)
def get_referent_id(self):
return self.get_long(4, self.endianness)
def set_referent_id(self, id):
self.set_long(4, id, self.endianness)
def get_obj_binuuid(self):
return self.get_bytes().tolist()[8:8+16]
def set_obj_binuuid(self, binuuid):
assert 16 == len(binuuid)
self.get_bytes()[8:8+16] = array.array('B', binuuid)
def get_referent_id2(self):
return self.get_long(24, self.endianness)
def set_referent_id2(self, id):
self.set_long(24, id, self.endianness)
def get_if_binuuid(self):
return self.get_bytes().tolist()[28:28+20]
def set_if_binuuid(self, binuuid):
assert 20 == len(binuuid)
self.get_bytes()[28:28+20] = array.array('B', binuuid)
def get_version_option(self):
return self.get_long(48, self.endianness)
def set_version_option(self, opt):
self.set_long(48, opt, self.endianness)
def get_handle(self):
return self.get_bytes().tolist()[52:52+20]
def set_handle(self, handle):
assert 20 == len(handle)
self.get_bytes()[52:52+20] = array.array('B', handle)
def get_max_entries(self):
return self.get_long(72, self.endianness)
def set_max_entries(self, num):
self.set_long(72, num, self.endianness)
def get_header_size(self):
return EPMLookupRequestHeader.__SIZE
class EPMRespLookupRequestHeader(ImpactPacket.Header):
__SIZE = 28
def __init__(self, aBuffer = None):
ImpactPacket.Header.__init__(self, EPMRespLookupRequestHeader.__SIZE)
if aBuffer: self.load_header(aBuffer)
def get_handle(self):
return self.get_bytes().tolist()[0:0+20]
def set_handle(self, handle):
assert 20 == len(handle)
self.get_bytes()[0:0+20] = array.array('B', handle)
def get_entries_num(self):
return self.get_long(20, '<')
def set_entries_num(self, num):
self.set_long(20, num, '<')
def get_entry(self):
return ndrutils.NDREntries(self.get_bytes().tostring()[24:-4])
def set_entry(self, entry):
raise Exception, "method not implemented"
def get_status(self):
off = self.get_entry().get_entries_len()
return self.get_long(24 + off, '<')
def set_status(self, status):
off = self.get_entry().get_entries_len()
self.set_long(24 + off, status, '<')
def get_header_size(self):
entries_size = self.get_entry().get_entries_len()
return EPMRespLookupRequestHeader.__SIZE + entries_size
class DCERPCEpm:
endianness = '<'
def __init__(self, dcerpc):
self._dcerpc = dcerpc
def portmap_dump(self, rpc_handle = '\x00'*20):
if self.endianness == '>':
from impacket.structure import unpack,pack
try:
rpc_handle = ''.join(map(chr, rpc_handle))
except:
pass
uuid = list(unpack('<LLHHBB6s', rpc_handle))
rpc_handle = pack('>LLHHBB6s', *uuid)
lookup = EPMLookupRequestHeader(endianness = self.endianness)
lookup.set_handle(rpc_handle);
self._dcerpc.send(lookup)
data = self._dcerpc.recv()
resp = EPMRespLookupRequestHeader(data)
return resp
class EpmEntry:
def __init__(self, uuid, version, annotation, objuuid, protocol, endpoint):
self.__uuid = uuid
self.__version = version
self.__annotation = annotation
self.__objuuid = objuuid
self.__protocol = protocol
self.__endpoint = endpoint
def getUUID(self):
return self.__uuid
def setUUID(self, uuid):
self.__uuid = uuid
def getProviderName(self):
return ndrutils.uuid_to_exe(uuid.string_to_bin(self.getUUID()) + struct.pack('<H', self.getVersion()))
def getVersion(self):
return self.__version
def setVersion(self, version):
self.__version = version
def isZeroObjUUID(self):
return self.__objuuid == '00000000-0000-0000-0000-000000000000'
def getObjUUID(self):
return self.__objuuid
def setObjUUID(self, objuuid):
self.__uuid = objuuid
def getAnnotation(self):
return self.__annotation
def setAnnotation(self, annotation):
self.__annotation = annotation
def getProtocol(self):
return self.__protocol
def setProtocol(self, protocol):
self.__protocol = protocol
def getEndpoint(self):
return self.__endpoint
def setEndpoint(self, endpoint):
self.__endpoint = endpoint
def __str__(self):
stringbinding = transport.DCERPCStringBindingCompose(self.getObjUUID(), self.getProtocol(), '', self.getEndpoint())
s = '['
if self.getAnnotation(): s += "Annotation: \"%s\", " % self.getAnnotation()
s += "UUID=%s, version %d, %s]" % (self.getUUID(), self.getVersion(), stringbinding)
return s
def __cmp__(self, o):
if (self.getUUID() == o.getUUID()
and self.getVersion() == o.getVersion()
and self.getAnnotation() == o.getAnnotation()
and self.getObjUUID() == o.getObjUUID()
and self.getProtocol() == o.getProtocol()
and self.getEndpoint() == o.getEndpoint()):
return 0
else:
return -1 # or +1, for what we care.